Flows:事件驱动的工作流
概述
Flows 是 CrewAI 提供的事件驱动工作流系统,用于构建需要精细控制的生产级自动化。与 Crews 的自主协作不同,Flows 提供确定性的执行路径和状态管理。

Crews vs Flows
| 特性 | Crews | Flows |
|---|---|---|
| 控制方式 | 自主协作 | 精细控制 |
| 执行路径 | 动态决策 | 确定性 |
| 状态管理 | 隐式 | 显式 |
| 适用场景 | 灵活任务 | 复杂业务流程 |
| 调试难度 | 较难 | 较易 |
核心装饰器
@start - 启动点
标记流程的入口方法:
python
from crewai.flow.flow import Flow, start
class MyFlow(Flow):
@start()
def begin(self):
"""流程入口"""
return {"status": "started"}@listen - 监听器
监听其他方法的输出:
python
from crewai.flow.flow import Flow, start, listen
class MyFlow(Flow):
@start()
def fetch_data(self):
return {"data": [1, 2, 3]}
@listen(fetch_data)
def process_data(self, data):
"""在 fetch_data 完成后执行"""
return {"processed": sum(data["data"])}@router - 路由器
根据条件路由到不同分支:
python
from crewai.flow.flow import Flow, start, listen, router
class MyFlow(Flow):
@start()
def check_status(self):
return {"score": 85}
@router(check_status)
def route_decision(self):
"""根据分数路由"""
if self.state.score >= 80:
return "high_score"
elif self.state.score >= 60:
return "medium_score"
return "low_score"
@listen("high_score")
def handle_high(self):
return "Excellent!"
@listen("medium_score")
def handle_medium(self):
return "Good job!"
@listen("low_score")
def handle_low(self):
return "Keep trying!"条件运算符
or_ - 任一条件
当任一指定条件满足时触发:
python
from crewai.flow.flow import Flow, listen, or_
class MyFlow(Flow):
@listen(or_("task_a", "task_b"))
def handle_either(self):
"""task_a 或 task_b 完成时触发"""
passand_ - 全部条件
当所有指定条件都满足时触发:
python
from crewai.flow.flow import Flow, listen, and_
class MyFlow(Flow):
@listen(and_("task_a", "task_b"))
def handle_both(self):
"""task_a 和 task_b 都完成时触发"""
pass状态管理
基本状态
python
from crewai.flow.flow import Flow, start, listen
class MyFlow(Flow):
@start()
def init_state(self):
self.state["counter"] = 0
return self.state
@listen(init_state)
def increment(self):
self.state["counter"] += 1
return self.state结构化状态(推荐)
使用 Pydantic 模型定义类型安全的状态:
python
from pydantic import BaseModel
from crewai.flow.flow import Flow, start, listen
class AnalysisState(BaseModel):
"""类型安全的状态定义"""
data: list = []
processed: bool = False
result: str = ""
confidence: float = 0.0
class AnalysisFlow(Flow[AnalysisState]):
@start()
def fetch_data(self):
self.state.data = [1, 2, 3, 4, 5]
return self.state
@listen(fetch_data)
def process(self):
self.state.result = str(sum(self.state.data))
self.state.processed = True
self.state.confidence = 0.95
return self.state集成 Crews
Flows 的真正威力在于与 Crews 结合:
python
from crewai.flow.flow import Flow, start, listen, router
from crewai import Crew, Agent, Task, Process
from pydantic import BaseModel
class MarketState(BaseModel):
sentiment: str = "neutral"
confidence: float = 0.0
recommendations: list = []
class MarketAnalysisFlow(Flow[MarketState]):
@start()
def fetch_market_data(self):
"""获取市场数据"""
self.state.sentiment = "analyzing"
return {"sector": "tech", "timeframe": "1W"}
@listen(fetch_market_data)
def analyze_with_crew(self, market_data):
"""使用 Crew 进行分析"""
analyst = Agent(
role="Senior Market Analyst",
goal="Conduct deep market analysis",
backstory="Veteran analyst known for identifying patterns"
)
researcher = Agent(
role="Data Researcher",
goal="Gather and validate market data",
backstory="Expert at correlating data sources"
)
analysis_task = Task(
description="Analyze {sector} sector for {timeframe}",
expected_output="Detailed analysis with confidence score",
agent=analyst
)
research_task = Task(
description="Validate the analysis with data",
expected_output="Supporting evidence",
agent=researcher
)
crew = Crew(
agents=[analyst, researcher],
tasks=[analysis_task, research_task],
process=Process.sequential,
verbose=True
)
result = crew.kickoff(inputs=market_data)
self.state.confidence = 0.85 # 从结果中提取
return result
@router(analyze_with_crew)
def determine_action(self):
"""根据置信度决定下一步"""
if self.state.confidence > 0.8:
return "high_confidence"
elif self.state.confidence > 0.5:
return "medium_confidence"
return "low_confidence"
@listen("high_confidence")
def execute_strategy(self):
"""高置信度:执行策略"""
strategy_crew = Crew(
agents=[Agent(role="Strategy Expert", goal="Develop strategy")],
tasks=[Task(
description="Create action plan",
expected_output="Step-by-step plan"
)]
)
return strategy_crew.kickoff()
@listen(or_("medium_confidence", "low_confidence"))
def gather_more_data(self):
"""低/中置信度:收集更多数据"""
self.state.recommendations.append("Gather more data")
return "Additional analysis required"流程图示例
简单线性流程

python
class LinearFlow(Flow):
@start()
def step1(self):
return "Step 1 done"
@listen(step1)
def step2(self, result):
return "Step 2 done"
@listen(step2)
def step3(self, result):
return "Final result"条件分支流程

python
class BranchingFlow(Flow):
@start()
def evaluate(self):
return {"value": 75}
@router(evaluate)
def route(self):
if self.state["value"] > 80:
return "high"
return "low"
@listen("high")
def high_branch(self):
return "High path"
@listen("low")
def low_branch(self):
return "Low path"并行执行流程

python
class ParallelFlow(Flow):
@start()
def begin(self):
return "Start"
@listen(begin)
def task_a(self, _):
return "A done"
@listen(begin)
def task_b(self, _):
return "B done"
@listen(and_(task_a, task_b))
def merge(self):
return "Both completed"执行 Flow
同步执行
python
flow = MyFlow()
result = flow.kickoff()
print(result)异步执行
python
import asyncio
async def main():
flow = MyFlow()
result = await flow.kickoff_async()
print(result)
asyncio.run(main())传入参数
python
flow = MyFlow()
result = flow.kickoff(inputs={"topic": "AI", "date": "2025-01-01"})可视化
CrewAI 支持 Flow 可视化:
python
from crewai.flow.visualization import render_interactive
flow = MyFlow()
render_interactive(flow) # 生成交互式可视化最佳实践
1. 状态设计
python
# 推荐:使用 Pydantic 模型
class TaskState(BaseModel):
input_data: dict = {}
intermediate_results: list = []
final_output: str = ""
errors: list = []
class MyFlow(Flow[TaskState]):
pass2. 错误处理
python
class RobustFlow(Flow):
@start()
def risky_operation(self):
try:
result = self.external_api_call()
return {"success": True, "data": result}
except Exception as e:
return {"success": False, "error": str(e)}
@router(risky_operation)
def handle_result(self):
if self.state.get("success"):
return "success_path"
return "error_path"
@listen("error_path")
def handle_error(self):
# 重试逻辑或通知
pass3. 模块化设计
python
# 将复杂逻辑封装为独立方法
class ModularFlow(Flow):
def _validate_input(self, data):
"""验证输入"""
return all(k in data for k in ["required_field"])
def _transform_data(self, data):
"""转换数据"""
return {k: v.upper() for k, v in data.items()}
@start()
def process(self):
if not self._validate_input(self.state):
raise ValueError("Invalid input")
return self._transform_data(self.state)与 LangGraph 对比
| 特性 | CrewAI Flows | LangGraph |
|---|---|---|
| 语法 | 装饰器 | 图定义 |
| 状态 | Pydantic/dict | TypedDict |
| 路由 | @router + 返回值 | 条件边 |
| 并行 | and_/or_ | 分支节点 |
| 学习曲线 | 较低 | 较高 |
| 灵活性 | 中等 | 高 |
| 调试 | 较易 | 较难 |
代码对比
CrewAI Flow:
python
class MyFlow(Flow):
@start()
def step1(self):
return "data"
@listen(step1)
def step2(self, data):
return f"processed: {data}"LangGraph:
python
from langgraph.graph import StateGraph
def step1(state):
return {"data": "data"}
def step2(state):
return {"result": f"processed: {state['data']}"}
graph = StateGraph(State)
graph.add_node("step1", step1)
graph.add_node("step2", step2)
graph.add_edge("step1", "step2")下一节:17.5 框架对比